"""
RQ command line tool
"""

import logging
import logging.config
import os
import sys
import warnings
from typing import TYPE_CHECKING, List, Type, cast

import click
from redis.exceptions import ConnectionError

from rq import Retry
from rq import __version__ as version
from rq.cli.helpers import (
    parse_function_args,
    parse_schedule,
    pass_cli_config,
    read_config_file,
    refresh,
    setup_loghandlers_from_args,
    show_both,
    show_queues,
    show_workers,
)

# from rq.cli.pool import pool
from rq.defaults import (
    DEFAULT_JOB_MONITORING_INTERVAL,
    DEFAULT_LOGGING_DATE_FORMAT,
    DEFAULT_LOGGING_FORMAT,
    DEFAULT_MAINTENANCE_TASK_INTERVAL,
    DEFAULT_RESULT_TTL,
    DEFAULT_WORKER_TTL,
)
from rq.exceptions import InvalidJobOperationError
from rq.job import Job, JobStatus
from rq.logutils import blue
from rq.registry import FailedJobRegistry, clean_registries
from rq.serializers import DefaultSerializer
from rq.suspension import is_suspended
from rq.suspension import resume as connection_resume
from rq.suspension import suspend as connection_suspend
from rq.utils import get_call_string, import_attribute
from rq.worker import Worker
from rq.worker_pool import WorkerPool
from rq.worker_registration import clean_worker_registry

if TYPE_CHECKING:
    from rq.serializers import Serializer


@click.group()
@click.version_option(version)
def main():
    """RQ command line tool."""
    pass


@main.command()
@click.option('--all', '-a', is_flag=True, help='Empty all queues')
@click.argument('queues', nargs=-1)
@pass_cli_config
def empty(cli_config, all, queues, serializer, **options):
    """Empty given queues."""

    if all:
        queues = cli_config.queue_class.all(
            connection=cli_config.connection,
            job_class=cli_config.job_class,
            death_penalty_class=cli_config.death_penalty_class,
            serializer=serializer,
        )
    else:
        queues = [
            cli_config.queue_class(
                queue, connection=cli_config.connection, job_class=cli_config.job_class, serializer=serializer
            )
            for queue in queues
        ]

    if not queues:
        click.echo('Nothing to do')
        sys.exit(0)

    for queue in queues:
        num_jobs = queue.empty()
        click.echo('{0} jobs removed from {1} queue'.format(num_jobs, queue.name))


@main.command()
@click.option('--all', '-a', is_flag=True, help='Requeue all failed jobs')
@click.option('--queue', required=True, type=str)
@click.argument('job_ids', nargs=-1)
@pass_cli_config
def requeue(cli_config, queue, all, job_class, serializer, job_ids, **options):
    """Requeue failed jobs."""

    failed_job_registry = FailedJobRegistry(
        queue, connection=cli_config.connection, job_class=job_class, serializer=serializer
    )
    if all:
        job_ids = failed_job_registry.get_job_ids()

    if not job_ids:
        click.echo('Nothing to do')
        sys.exit(0)

    click.echo('Requeueing {0} jobs from failed queue'.format(len(job_ids)))
    fail_count = 0
    with click.progressbar(job_ids) as job_ids:
        for job_id in job_ids:
            try:
                failed_job_registry.requeue(job_id)
            except InvalidJobOperationError:
                fail_count += 1

    if fail_count > 0:
        click.secho('Unable to requeue {0} jobs from failed job registry'.format(fail_count), fg='red')


@main.command()
@click.option('--interval', '-i', type=float, help="Updates stats every N seconds (default: don't poll)")
@click.option('--raw', '-r', is_flag=True, help='Print only the raw numbers, no bar charts')
@click.option('--only-queues', '-Q', is_flag=True, help='Show only queue info')
@click.option('--only-workers', '-W', is_flag=True, help='Show only worker info')
@click.option('--by-queue', '-R', is_flag=True, help='Shows workers by queue')
@click.argument('queues', nargs=-1)
@pass_cli_config
def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues, **options):
    """RQ command-line monitor."""

    if only_queues:
        func = show_queues
    elif only_workers:
        func = show_workers
    else:
        func = show_both

    try:
        if queues:
            qs = []
            for queue_name in queues:
                qs.append(cli_config.queue_class(queue_name, connection=cli_config.connection))
        else:
            qs = cli_config.queue_class.all(connection=cli_config.connection)

        for queue in qs:
            clean_registries(queue)
            clean_worker_registry(queue)

        refresh(
            interval, func, qs, raw, by_queue, cli_config.queue_class, cli_config.worker_class, cli_config.connection
        )
    except ConnectionError as e:
        click.echo(e)
        sys.exit(1)
    except KeyboardInterrupt:
        click.echo()
        sys.exit(0)


@main.command()
@click.option('--burst', '-b', is_flag=True, help='Run in burst mode (quit after all work is done)')
@click.option('--logging_level', type=str, default=None, help='Set logging level')
@click.option('--log-format', type=str, default=DEFAULT_LOGGING_FORMAT, help='Set the format of the logs')
@click.option('--date-format', type=str, default=DEFAULT_LOGGING_DATE_FORMAT, help='Set the date format of the logs')
@click.option('--name', '-n', help='Specify a different name')
@click.option('--results-ttl', type=int, default=DEFAULT_RESULT_TTL, help='Default results timeout to be used')
@click.option('--worker-ttl', type=int, default=DEFAULT_WORKER_TTL, help='Worker timeout to be used')
@click.option(
    '--maintenance-interval',
    type=int,
    default=DEFAULT_MAINTENANCE_TASK_INTERVAL,
    help='Maintenance task interval (in seconds) to be used',
)
@click.option(
    '--job-monitoring-interval',
    type=int,
    default=DEFAULT_JOB_MONITORING_INTERVAL,
    help='Default job monitoring interval to be used',
)
@click.option('--disable-job-desc-logging', is_flag=True, help='Turn off description logging.')
@click.option('--verbose', '-v', is_flag=True, help='Show more output')
@click.option('--quiet', '-q', is_flag=True, help='Show less output')
@click.option('--exception-handler', help='Exception handler(s) to use', multiple=True)
@click.option('--pid', help='Write the process ID number to a file at the specified path')
@click.option('--disable-default-exception-handler', '-d', is_flag=True, help="Disable RQ's default exception handler")
@click.option('--max-jobs', type=int, default=None, help='Maximum number of jobs to execute')
@click.option('--max-idle-time', type=int, default=None, help='Maximum seconds to stay alive without jobs to execute')
@click.option('--with-scheduler', '-s', is_flag=True, help='Run worker with scheduler')
@click.option('--serializer', '-S', default=None, help='Run worker with custom serializer')
@click.option(
    '--dequeue-strategy', '-ds', default='default', help='Sets a custom stratey to dequeue from multiple queues'
)
@click.argument('queues', nargs=-1)
@pass_cli_config
def worker(
    cli_config,
    burst,
    logging_level,
    name,
    results_ttl,
    worker_ttl,
    maintenance_interval,
    job_monitoring_interval,
    disable_job_desc_logging,
    verbose,
    quiet,
    exception_handler,
    pid,
    disable_default_exception_handler,
    max_jobs,
    max_idle_time,
    with_scheduler,
    queues,
    log_format,
    date_format,
    serializer,
    dequeue_strategy,
    **options,
):
    """Starts an RQ worker."""
    settings = read_config_file(cli_config.config) if cli_config.config else {}
    # Worker specific default arguments
    queues = queues or settings.get('QUEUES', ['default'])
    name = name or settings.get('NAME')
    dict_config = settings.get('DICT_CONFIG')

    if dict_config:
        logging.config.dictConfig(dict_config)

    if pid:
        with open(os.path.expanduser(pid), 'w') as fp:
            fp.write(str(os.getpid()))

    worker_name = cli_config.worker_class.__qualname__
    if worker_name in ['RoundRobinWorker', 'RandomWorker']:
        strategy_alternative = 'random' if worker_name == 'RandomWorker' else 'round_robin'
        msg = f'WARNING: {worker_name} is deprecated. Use `--dequeue-strategy {strategy_alternative}` instead.'
        warnings.warn(msg, DeprecationWarning)
        click.secho(msg, fg='yellow')

    if dequeue_strategy not in ('default', 'random', 'round_robin'):
        click.secho(
            'ERROR: Dequeue Strategy can only be one of `default`, `random` or `round_robin`.', err=True, fg='red'
        )
        sys.exit(1)

    setup_loghandlers_from_args(verbose, quiet, date_format, log_format)

    try:
        exception_handlers = []
        for h in exception_handler:
            exception_handlers.append(import_attribute(h))

        if is_suspended(cli_config.connection):
            click.secho('RQ is currently suspended, to resume job execution run "rq resume"', fg='red')
            sys.exit(1)

        queues = [
            cli_config.queue_class(
                queue, connection=cli_config.connection, job_class=cli_config.job_class, serializer=serializer
            )
            for queue in queues
        ]
        worker = cli_config.worker_class(
            queues,
            name=name,
            connection=cli_config.connection,
            default_worker_ttl=worker_ttl,  # TODO remove this arg in 2.0
            worker_ttl=worker_ttl,
            default_result_ttl=results_ttl,
            maintenance_interval=maintenance_interval,
            job_monitoring_interval=job_monitoring_interval,
            job_class=cli_config.job_class,
            queue_class=cli_config.queue_class,
            exception_handlers=exception_handlers or None,
            disable_default_exception_handler=disable_default_exception_handler,
            log_job_description=not disable_job_desc_logging,
            serializer=serializer,
        )

        # if --verbose or --quiet, override --logging_level
        if verbose or quiet:
            logging_level = None

        worker.work(
            burst=burst,
            logging_level=logging_level,
            date_format=date_format,
            log_format=log_format,
            max_jobs=max_jobs,
            max_idle_time=max_idle_time,
            with_scheduler=with_scheduler,
            dequeue_strategy=dequeue_strategy,
        )
    except ConnectionError as e:
        logging.error(e)
        sys.exit(1)


@main.command()
@click.option('--duration', help='Seconds you want the workers to be suspended.  Default is forever.', type=int)
@pass_cli_config
def suspend(cli_config, duration, **options):
    """Suspends all workers, to resume run `rq resume`"""

    if duration is not None and duration < 1:
        click.echo('Duration must be an integer greater than 1')
        sys.exit(1)

    connection_suspend(cli_config.connection, duration)

    if duration:
        msg = """Suspending workers for {0} seconds.  No new jobs will be started during that time, but then will
        automatically resume""".format(duration)
        click.echo(msg)
    else:
        click.echo('Suspending workers.  No new jobs will be started.  But current jobs will be completed')


@main.command()
@pass_cli_config
def resume(cli_config, **options):
    """Resumes processing of queues, that were suspended with `rq suspend`"""
    connection_resume(cli_config.connection)
    click.echo('Resuming workers.')


@main.command()
@click.option('--queue', '-q', help='The name of the queue.', default='default')
@click.option(
    '--timeout', help='Specifies the maximum runtime of the job before it is interrupted and marked as failed.'
)
@click.option('--result-ttl', help='Specifies how long successful jobs and their results are kept.')
@click.option('--ttl', help='Specifies the maximum queued time of the job before it is discarded.')
@click.option('--failure-ttl', help='Specifies how long failed jobs are kept.')
@click.option('--description', help='Additional description of the job')
@click.option(
    '--depends-on', help='Specifies another job id that must complete before this job will be queued.', multiple=True
)
@click.option('--job-id', help='The id of this job')
@click.option('--at-front', is_flag=True, help='Will place the job at the front of the queue, instead of the end')
@click.option('--retry-max', help='Maximum amount of retries', default=0, type=int)
@click.option('--retry-interval', help='Interval between retries in seconds', multiple=True, type=int, default=[0])
@click.option('--schedule-in', help='Delay until the function is enqueued (e.g. 10s, 5m, 2d).')
@click.option(
    '--schedule-at',
    help='Schedule job to be enqueued at a certain time formatted in ISO 8601 without '
    'timezone (e.g. 2021-05-27T21:45:00).',
)
@click.option('--quiet', is_flag=True, help='Only logs errors.')
@click.argument('function')
@click.argument('arguments', nargs=-1)
@pass_cli_config
def enqueue(
    cli_config,
    queue,
    timeout,
    result_ttl,
    ttl,
    failure_ttl,
    description,
    depends_on,
    job_id,
    at_front,
    retry_max,
    retry_interval,
    schedule_in,
    schedule_at,
    quiet,
    serializer,
    function,
    arguments,
    **options,
):
    """Enqueues a job from the command line"""
    args, kwargs = parse_function_args(arguments)
    function_string = get_call_string(function, args, kwargs)
    description = description or function_string

    retry = None
    if retry_max > 0:
        retry = Retry(retry_max, retry_interval)

    schedule = parse_schedule(schedule_in, schedule_at)

    queue = cli_config.queue_class(queue, serializer=serializer, connection=cli_config.connection)

    if schedule is None:
        job = queue.enqueue_call(
            function,
            args,
            kwargs,
            timeout,
            result_ttl,
            ttl,
            failure_ttl,
            description,
            depends_on,
            job_id,
            at_front,
            None,
            retry,
        )
    else:
        job = queue.create_job(
            function,
            args,
            kwargs,
            timeout,
            result_ttl,
            ttl,
            failure_ttl,
            description,
            depends_on,
            job_id,
            None,
            JobStatus.SCHEDULED,
            retry,
        )
        queue.schedule_job(job, schedule)

    if not quiet:
        click.echo("Enqueued %s with job-id '%s'." % (blue(function_string), job.id))


@main.command()
@click.option('--burst', '-b', is_flag=True, help='Run in burst mode (quit after all work is done)')
@click.option('--logging-level', '-l', type=str, default='INFO', help='Set logging level')
@click.option('--verbose', '-v', is_flag=True, help='Show more output')
@click.option('--quiet', '-q', is_flag=True, help='Show less output')
@click.option('--log-format', type=str, default=DEFAULT_LOGGING_FORMAT, help='Set the format of the logs')
@click.option('--date-format', type=str, default=DEFAULT_LOGGING_DATE_FORMAT, help='Set the date format of the logs')
@click.option('--job-class', type=str, default=None, help='Dotted path to a Job class')
@click.argument('queues', nargs=-1)
@click.option('--num-workers', '-n', type=int, default=1, help='Number of workers to start')
@pass_cli_config
def worker_pool(
    cli_config,
    burst: bool,
    logging_level,
    queues,
    serializer,
    verbose,
    quiet,
    log_format,
    date_format,
    worker_class,
    job_class,
    num_workers,
    **options,
):
    """Starts a RQ worker pool"""
    settings = read_config_file(cli_config.config) if cli_config.config else {}
    # Worker specific default arguments
    queue_names: List[str] = queues or settings.get('QUEUES', ['default'])

    setup_loghandlers_from_args(verbose, quiet, date_format, log_format)

    if serializer:
        serializer_class = cast(Type['Serializer'], import_attribute(serializer))
    else:
        serializer_class = DefaultSerializer

    if worker_class:
        worker_class = import_attribute(worker_class)
    else:
        worker_class = Worker

    if job_class:
        job_class = import_attribute(job_class)
    else:
        job_class = Job

    # if --verbose or --quiet, override --logging_level
    if verbose or quiet:
        logging_level = None

    pool = WorkerPool(
        queue_names,
        connection=cli_config.connection,
        num_workers=num_workers,
        serializer=serializer_class,
        worker_class=worker_class,
        job_class=job_class,
    )
    pool.start(burst=burst, logging_level=logging_level)


if __name__ == '__main__':
    main()
